Browse Source

Add a HostFailureListener to notify client code if a node got disconnected (#21709)

Today there is no way to get notified if a node is disconnected. Client code
must poll the TransportClient constantly to detect that a node is not connected
anymore in order to react and add new nodes or notify altering etc. For instance
if a hostname  gets resolved to an IP but that host is disconnected clients want
to reconnect by resolving the hostname again which is a common situation in cloud
environments.

Closes #21424
Simon Willnauer 9 years ago
parent
commit
a9a2753f0b

+ 0 - 2
buildSrc/src/main/resources/checkstyle_suppressions.xml

@@ -223,8 +223,6 @@
   <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]client[/\\]FilterClient.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]client[/\\]support[/\\]AbstractClient.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]client[/\\]transport[/\\]TransportClient.java" checks="LineLength" />
-  <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]client[/\\]transport[/\\]support[/\\]TransportProxyClient.java" checks="LineLength" />
-  <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]ClusterState.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]ClusterStateObserver.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]ClusterStateUpdateTask.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]DiffableUtils.java" checks="LineLength" />

+ 24 - 1
client/transport/src/main/java/org/elasticsearch/transport/client/PreBuiltTransportClient.java

@@ -55,13 +55,36 @@ public class PreBuiltTransportClient extends TransportClient {
                             PercolatorPlugin.class,
                             MustachePlugin.class));
 
+
+    /**
+     * Creates a new transport client with pre-installed plugins.
+     * @param settings the settings passed to this transport client
+     * @param plugins an optional array of additional plugins to run with this client
+     */
     @SafeVarargs
     public PreBuiltTransportClient(Settings settings, Class<? extends Plugin>... plugins) {
         this(settings, Arrays.asList(plugins));
     }
 
+
+    /**
+     * Creates a new transport client with pre-installed plugins.
+     * @param settings the settings passed to this transport client
+     * @param plugins a collection of additional plugins to run with this client
+     */
     public PreBuiltTransportClient(Settings settings, Collection<Class<? extends Plugin>> plugins) {
-        super(settings, Settings.EMPTY, addPlugins(plugins, PRE_INSTALLED_PLUGINS));
+        this(settings, plugins, null);
+    }
+
+    /**
+     * Creates a new transport client with pre-installed plugins.
+     * @param settings the settings passed to this transport client
+     * @param plugins a collection of additional plugins to run with this client
+     * @param hostFailureListener a failure listener that is invoked if a node is disconnected. This can be <code>null</code>
+     */
+    public PreBuiltTransportClient(Settings settings, Collection<Class<? extends Plugin>> plugins,
+                                   HostFailureListener hostFailureListener) {
+        super(settings, Settings.EMPTY, addPlugins(plugins, PRE_INSTALLED_PLUGINS), hostFailureListener);
     }
 
     @Override

+ 37 - 6
core/src/main/java/org/elasticsearch/client/transport/TransportClient.java

@@ -27,7 +27,6 @@ import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionRequestBuilder;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.client.support.AbstractClient;
-import org.elasticsearch.client.transport.support.TransportProxyClient;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.component.LifecycleComponent;
 import org.elasticsearch.common.inject.Injector;
@@ -40,6 +39,7 @@ import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.SettingsModule;
 import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.indices.breaker.CircuitBreakerService;
 import org.elasticsearch.node.Node;
@@ -65,6 +65,8 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
+
 /**
  * The transport client allows to create a client that is not part of the cluster, but simply connects to one
  * or more nodes directly by adding their respective addresses using {@link #addTransportAddress(org.elasticsearch.common.transport.TransportAddress)}.
@@ -74,6 +76,15 @@ import java.util.stream.Collectors;
  */
 public abstract class TransportClient extends AbstractClient {
 
+    public static final Setting<TimeValue> CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL =
+        Setting.positiveTimeSetting("client.transport.nodes_sampler_interval", timeValueSeconds(5), Setting.Property.NodeScope);
+    public static final Setting<TimeValue> CLIENT_TRANSPORT_PING_TIMEOUT =
+        Setting.positiveTimeSetting("client.transport.ping_timeout", timeValueSeconds(5), Setting.Property.NodeScope);
+    public static final Setting<Boolean> CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME =
+        Setting.boolSetting("client.transport.ignore_cluster_name", false, Setting.Property.NodeScope);
+    public static final Setting<Boolean> CLIENT_TRANSPORT_SNIFF =
+        Setting.boolSetting("client.transport.sniff", false, Setting.Property.NodeScope);
+
     private static PluginsService newPluginService(final Settings settings, Collection<Class<? extends Plugin>> plugins) {
         final Settings.Builder settingsBuilder = Settings.builder()
                 .put(TcpTransport.PING_SCHEDULE.getKey(), "5s") // enable by default the transport schedule ping interval
@@ -101,7 +112,7 @@ public abstract class TransportClient extends AbstractClient {
     }
 
     private static ClientTemplate buildTemplate(Settings providedSettings, Settings defaultSettings,
-                                                Collection<Class<? extends Plugin>> plugins) {
+                                                Collection<Class<? extends Plugin>> plugins, HostFailureListener failureListner) {
         if (Node.NODE_NAME_SETTING.exists(providedSettings) == false) {
             providedSettings = Settings.builder().put(providedSettings).put(Node.NODE_NAME_SETTING.getKey(), "_client_").build();
         }
@@ -164,7 +175,8 @@ public abstract class TransportClient extends AbstractClient {
 
             Injector injector = modules.createInjector();
             final TransportClientNodesService nodesService =
-                new TransportClientNodesService(settings, transportService, threadPool);
+                new TransportClientNodesService(settings, transportService, threadPool, failureListner == null
+                    ? (t, e) -> {} : failureListner);
             final TransportProxyClient proxy = new TransportProxyClient(settings, transportService, nodesService,
                 actionModule.getActions().values().stream().map(x -> x.getAction()).collect(Collectors.toList()));
 
@@ -222,7 +234,7 @@ public abstract class TransportClient extends AbstractClient {
      * Creates a new TransportClient with the given settings and plugins
      */
     public TransportClient(Settings settings, Collection<Class<? extends Plugin>> plugins) {
-        this(buildTemplate(settings, Settings.EMPTY, plugins));
+        this(buildTemplate(settings, Settings.EMPTY, plugins, null));
     }
 
     /**
@@ -231,8 +243,9 @@ public abstract class TransportClient extends AbstractClient {
      * @param defaultSettings default settings that are merged after the plugins have added it's additional settings.
      * @param plugins the client plugins
      */
-    protected TransportClient(Settings settings, Settings defaultSettings, Collection<Class<? extends Plugin>> plugins) {
-        this(buildTemplate(settings, defaultSettings, plugins));
+    protected TransportClient(Settings settings, Settings defaultSettings, Collection<Class<? extends Plugin>> plugins,
+                              HostFailureListener hostFailureListener) {
+        this(buildTemplate(settings, defaultSettings, plugins, hostFailureListener));
     }
 
     private TransportClient(ClientTemplate template) {
@@ -332,4 +345,22 @@ public abstract class TransportClient extends AbstractClient {
     protected <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
         proxy.execute(action, request, listener);
     }
+
+    /**
+     * Listener that allows to be notified whenever a node failure / disconnect happens
+     */
+    @FunctionalInterface
+    public interface HostFailureListener {
+        /**
+         * Called once a node disconnect is detected.
+         * @param node the node that has been disconnected
+         * @param ex the exception causing the disconnection
+         */
+        void onNodeDisconnected(DiscoveryNode node, Exception ex);
+    }
+
+    // pkg private for testing
+    TransportClientNodesService getNodesService() {
+        return nodesService;
+    }
 }

+ 43 - 27
core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java

@@ -35,8 +35,6 @@ import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.Randomness;
 import org.elasticsearch.common.component.AbstractComponent;
-import org.elasticsearch.common.settings.Setting;
-import org.elasticsearch.common.settings.Setting.Property;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.common.unit.TimeValue;
@@ -45,6 +43,8 @@ import org.elasticsearch.common.util.concurrent.FutureUtils;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.ConnectTransportException;
 import org.elasticsearch.transport.FutureTransportResponseHandler;
+import org.elasticsearch.transport.NodeDisconnectedException;
+import org.elasticsearch.transport.NodeNotConnectedException;
 import org.elasticsearch.transport.TransportException;
 import org.elasticsearch.transport.TransportRequestOptions;
 import org.elasticsearch.transport.TransportResponseHandler;
@@ -64,9 +64,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
-
-public class TransportClientNodesService extends AbstractComponent implements Closeable {
+final class TransportClientNodesService extends AbstractComponent implements Closeable {
 
     private final TimeValue nodesSamplerInterval;
 
@@ -100,37 +98,30 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
 
     private volatile boolean closed;
 
+    private final TransportClient.HostFailureListener hostFailureListener;
 
-    public static final Setting<TimeValue> CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL =
-        Setting.positiveTimeSetting("client.transport.nodes_sampler_interval", timeValueSeconds(5), Property.NodeScope);
-    public static final Setting<TimeValue> CLIENT_TRANSPORT_PING_TIMEOUT =
-        Setting.positiveTimeSetting("client.transport.ping_timeout", timeValueSeconds(5), Property.NodeScope);
-    public static final Setting<Boolean> CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME =
-        Setting.boolSetting("client.transport.ignore_cluster_name", false, Property.NodeScope);
-    public static final Setting<Boolean> CLIENT_TRANSPORT_SNIFF =
-        Setting.boolSetting("client.transport.sniff", false, Property.NodeScope);
-
-    public TransportClientNodesService(Settings settings,TransportService transportService,
-                                       ThreadPool threadPool) {
+    TransportClientNodesService(Settings settings, TransportService transportService,
+                                       ThreadPool threadPool, TransportClient.HostFailureListener hostFailureListener) {
         super(settings);
         this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
         this.transportService = transportService;
         this.threadPool = threadPool;
         this.minCompatibilityVersion = Version.CURRENT.minimumCompatibilityVersion();
 
-        this.nodesSamplerInterval = CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL.get(this.settings);
-        this.pingTimeout = CLIENT_TRANSPORT_PING_TIMEOUT.get(this.settings).millis();
-        this.ignoreClusterName = CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME.get(this.settings);
+        this.nodesSamplerInterval = TransportClient.CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL.get(this.settings);
+        this.pingTimeout = TransportClient.CLIENT_TRANSPORT_PING_TIMEOUT.get(this.settings).millis();
+        this.ignoreClusterName = TransportClient.CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME.get(this.settings);
 
         if (logger.isDebugEnabled()) {
             logger.debug("node_sampler_interval[{}]", nodesSamplerInterval);
         }
 
-        if (CLIENT_TRANSPORT_SNIFF.get(this.settings)) {
+        if (TransportClient.CLIENT_TRANSPORT_SNIFF.get(this.settings)) {
             this.nodesSampler = new SniffNodesSampler();
         } else {
             this.nodesSampler = new SimpleNodeSampler();
         }
+        this.hostFailureListener = hostFailureListener;
         this.nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, new ScheduledNodeSampler());
     }
 
@@ -224,13 +215,17 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
         }
         ensureNodesAreAvailable(nodes);
         int index = getNodeNumber();
-        RetryListener<Response> retryListener = new RetryListener<>(callback, listener, nodes, index);
-        DiscoveryNode node = nodes.get((index) % nodes.size());
+        RetryListener<Response> retryListener = new RetryListener<>(callback, listener, nodes, index, hostFailureListener);
+        DiscoveryNode node = retryListener.getNode(0);
         try {
             callback.doWithNode(node, retryListener);
         } catch (Exception e) {
-            //this exception can't come from the TransportService as it doesn't throw exception at all
-            listener.onFailure(e);
+            try {
+                //this exception can't come from the TransportService as it doesn't throw exception at all
+                listener.onFailure(e);
+            } finally {
+                retryListener.maybeNodeFailed(node, e);
+            }
         }
     }
 
@@ -239,15 +234,17 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
         private final ActionListener<Response> listener;
         private final List<DiscoveryNode> nodes;
         private final int index;
+        private final TransportClient.HostFailureListener hostFailureListener;
 
         private volatile int i;
 
         public RetryListener(NodeListenerCallback<Response> callback, ActionListener<Response> listener,
-                             List<DiscoveryNode> nodes, int index) {
+                             List<DiscoveryNode> nodes, int index, TransportClient.HostFailureListener hostFailureListener) {
             this.callback = callback;
             this.listener = listener;
             this.nodes = nodes;
             this.index = index;
+            this.hostFailureListener = hostFailureListener;
         }
 
         @Override
@@ -257,13 +254,15 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
 
         @Override
         public void onFailure(Exception e) {
-            if (ExceptionsHelper.unwrapCause(e) instanceof ConnectTransportException) {
+            Throwable throwable = ExceptionsHelper.unwrapCause(e);
+            if (throwable instanceof ConnectTransportException) {
+                maybeNodeFailed(getNode(this.i), (ConnectTransportException) throwable);
                 int i = ++this.i;
                 if (i >= nodes.size()) {
                     listener.onFailure(new NoNodeAvailableException("None of the configured nodes were available: " + nodes, e));
                 } else {
                     try {
-                        callback.doWithNode(nodes.get((index + i) % nodes.size()), this);
+                        callback.doWithNode(getNode(i), this);
                     } catch(final Exception inner) {
                         inner.addSuppressed(e);
                         // this exception can't come from the TransportService as it doesn't throw exceptions at all
@@ -275,7 +274,15 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
             }
         }
 
+        final DiscoveryNode getNode(int i) {
+            return nodes.get((index + i) % nodes.size());
+        }
 
+        final void maybeNodeFailed(DiscoveryNode node, Exception ex) {
+            if (ex instanceof NodeDisconnectedException || ex instanceof NodeNotConnectedException) {
+                hostFailureListener.onNodeDisconnected(node, ex);
+            }
+        }
     }
 
     @Override
@@ -377,6 +384,7 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
                         logger.debug(
                             (Supplier<?>)
                                 () -> new ParameterizedMessage("failed to connect to node [{}], removed from nodes list", listedNode), e);
+                        hostFailureListener.onNodeDisconnected(listedNode, e);
                         newFilteredNodes.add(listedNode);
                         continue;
                     }
@@ -411,6 +419,7 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
                     logger.info(
                         (Supplier<?>) () -> new ParameterizedMessage("failed to get node info for {}, disconnecting...", listedNode), e);
                     transportService.disconnectFromNode(listedNode);
+                    hostFailureListener.onNodeDisconnected(listedNode, e);
                 }
             }
 
@@ -489,6 +498,7 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
                                                     "failed to get local cluster state for {}, disconnecting...", listedNode), e);
                                             transportService.disconnectFromNode(listedNode);
                                             latch.countDown();
+                                            hostFailureListener.onNodeDisconnected(listedNode, e);
                                         }
                                     });
                         } catch (Exception e) {
@@ -497,6 +507,7 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
                                     "failed to get local cluster state info for {}, disconnecting...", listedNode), e);
                             transportService.disconnectFromNode(listedNode);
                             latch.countDown();
+                            hostFailureListener.onNodeDisconnected(listedNode, e);
                         }
                     }
                 });
@@ -531,4 +542,9 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
 
         void doWithNode(DiscoveryNode node, ActionListener<Response> listener);
     }
+
+    // pkg private for testing
+    void doSample() {
+        nodesSampler.doSample();
+    }
 }

+ 6 - 7
core/src/main/java/org/elasticsearch/client/transport/support/TransportProxyClient.java → core/src/main/java/org/elasticsearch/client/transport/TransportProxyClient.java

@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.elasticsearch.client.transport.support;
+package org.elasticsearch.client.transport;
 
 import org.elasticsearch.action.Action;
 import org.elasticsearch.action.ActionListener;
@@ -26,9 +26,6 @@ import org.elasticsearch.action.ActionRequestBuilder;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.GenericAction;
 import org.elasticsearch.action.TransportActionNodeProxy;
-import org.elasticsearch.client.transport.TransportClientNodesService;
-import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.transport.TransportService;
 
@@ -38,12 +35,12 @@ import java.util.Map;
 
 import static java.util.Collections.unmodifiableMap;
 
-public class TransportProxyClient {
+final class TransportProxyClient {
 
     private final TransportClientNodesService nodesService;
     private final Map<Action, TransportActionNodeProxy> proxies;
 
-    public TransportProxyClient(Settings settings, TransportService transportService,
+    TransportProxyClient(Settings settings, TransportService transportService,
                                 TransportClientNodesService nodesService, List<GenericAction> actions) {
         this.nodesService = nodesService;
         Map<Action, TransportActionNodeProxy> proxies = new HashMap<>();
@@ -55,7 +52,9 @@ public class TransportProxyClient {
         this.proxies = unmodifiableMap(proxies);
     }
 
-    public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(final Action<Request, Response, RequestBuilder> action, final Request request, ActionListener<Response> listener) {
+    public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends
+        ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(final Action<Request, Response, RequestBuilder> action,
+                                                                              final Request request, ActionListener<Response> listener) {
         final TransportActionNodeProxy<Request, Response> proxy = proxies.get(action);
         nodesService.execute((n, l) -> proxy.execute(n, request, l), listener);
     }

+ 8 - 4
core/src/main/java/org/elasticsearch/cluster/ClusterState.java

@@ -143,10 +143,12 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
     private volatile RoutingNodes routingNodes;
 
     public ClusterState(long version, String stateUUID, ClusterState state) {
-        this(state.clusterName, version, stateUUID, state.metaData(), state.routingTable(), state.nodes(), state.blocks(), state.customs(), false);
+        this(state.clusterName, version, stateUUID, state.metaData(), state.routingTable(), state.nodes(), state.blocks(), state.customs(),
+            false);
     }
 
-    public ClusterState(ClusterName clusterName, long version, String stateUUID, MetaData metaData, RoutingTable routingTable, DiscoveryNodes nodes, ClusterBlocks blocks, ImmutableOpenMap<String, Custom> customs, boolean wasReadFromDiff) {
+    public ClusterState(ClusterName clusterName, long version, String stateUUID, MetaData metaData, RoutingTable routingTable,
+                        DiscoveryNodes nodes, ClusterBlocks blocks, ImmutableOpenMap<String, Custom> customs, boolean wasReadFromDiff) {
         this.version = version;
         this.stateUUID = stateUUID;
         this.clusterName = clusterName;
@@ -272,12 +274,14 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
     }
 
     /**
-     * a cluster state supersedes another state iff they are from the same master and the version this state is higher thant the other state.
+     * a cluster state supersedes another state iff they are from the same master and the version this state is higher thant the other
+     * state.
      * <p>
      * In essence that means that all the changes from the other cluster state are also reflected by the current one
      */
     public boolean supersedes(ClusterState other) {
-        return this.nodes().getMasterNodeId() != null && this.nodes().getMasterNodeId().equals(other.nodes().getMasterNodeId()) && this.version() > other.version();
+        return this.nodes().getMasterNodeId() != null && this.nodes().getMasterNodeId().equals(other.nodes().getMasterNodeId())
+            && this.version() > other.version();
 
     }
 

+ 5 - 5
core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

@@ -25,7 +25,7 @@ import org.elasticsearch.action.support.DestructiveOperations;
 import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
 import org.elasticsearch.bootstrap.BootstrapSettings;
 import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.TransportClientNodesService;
+import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.cluster.ClusterModule;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.InternalClusterInfoService;
@@ -162,11 +162,11 @@ public final class ClusterSettings extends AbstractScopedSettings {
 
     public static Set<Setting<?>> BUILT_IN_CLUSTER_SETTINGS = Collections.unmodifiableSet(new HashSet<>(
             Arrays.asList(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING,
-                    TransportClientNodesService.CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL, // TODO these transport client settings are kind
+                    TransportClient.CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL, // TODO these transport client settings are kind
                     // of odd here and should only be valid if we are a transport client
-                    TransportClientNodesService.CLIENT_TRANSPORT_PING_TIMEOUT,
-                    TransportClientNodesService.CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME,
-                    TransportClientNodesService.CLIENT_TRANSPORT_SNIFF,
+                    TransportClient.CLIENT_TRANSPORT_PING_TIMEOUT,
+                    TransportClient.CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME,
+                    TransportClient.CLIENT_TRANSPORT_SNIFF,
                     AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING,
                     BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING,
                     BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING,

+ 78 - 0
core/src/test/java/org/elasticsearch/client/transport/NodeDisconnectIT.java

@@ -0,0 +1,78 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.client.transport;
+
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.transport.MockTransportClient;
+import org.elasticsearch.transport.TransportService;
+import org.hamcrest.Matchers;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+
+public class NodeDisconnectIT  extends ESIntegTestCase {
+
+    public void testNotifyOnDisconnect() throws IOException {
+        internalCluster().ensureAtLeastNumDataNodes(2);
+
+        final Set<DiscoveryNode> disconnectedNodes = Collections.synchronizedSet(new HashSet<>());
+        try (TransportClient client = new MockTransportClient(Settings.builder()
+            .put("cluster.name", internalCluster().getClusterName()).build(), Collections.emptySet(), (n, e) -> disconnectedNodes.add(n))) {
+            for (TransportService service : internalCluster().getInstances(TransportService.class)) {
+                client.addTransportAddress(service.boundAddress().publishAddress());
+            }
+            internalCluster().stopRandomDataNode();
+            for (int i = 0; i < 20; i++) { // fire up requests such that we hit the node and pass it to the listener
+                client.admin().cluster().prepareState().get();
+            }
+            assertEquals(1, disconnectedNodes.size());
+        }
+        assertEquals(1, disconnectedNodes.size());
+    }
+
+    public void testNotifyOnDisconnectInSniffer() throws IOException {
+        internalCluster().ensureAtLeastNumDataNodes(2);
+
+        final Set<DiscoveryNode> disconnectedNodes = Collections.synchronizedSet(new HashSet<>());
+        try (TransportClient client = new MockTransportClient(Settings.builder()
+            .put("cluster.name", internalCluster().getClusterName()).build(), Collections.emptySet(), (n, e) -> disconnectedNodes.add(n))) {
+            int numNodes = 0;
+            for (TransportService service : internalCluster().getInstances(TransportService.class)) {
+                numNodes++;
+                client.addTransportAddress(service.boundAddress().publishAddress());
+            }
+            Set<TransportAddress> discoveryNodes = client.connectedNodes().stream().map(n -> n.getAddress()).collect(Collectors.toSet());
+            assertEquals(numNodes, discoveryNodes.size());
+            assertEquals(0, disconnectedNodes.size());
+            internalCluster().stopRandomDataNode();
+            client.getNodesService().doSample();
+            assertEquals(1, disconnectedNodes.size());
+            assertTrue(discoveryNodes.contains(disconnectedNodes.stream().findAny().get().getAddress()));
+        }
+        assertEquals(1, disconnectedNodes.size());
+    }
+}

+ 1 - 1
core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java

@@ -101,7 +101,7 @@ public class TransportClientNodesServiceTests extends ESTestCase {
             transportService.start();
             transportService.acceptIncomingRequests();
             transportClientNodesService =
-                    new TransportClientNodesService(settings, transportService, threadPool);
+                    new TransportClientNodesService(settings, transportService, threadPool, (a, b) -> {});
             this.nodesCount = randomIntBetween(1, 10);
             for (int i = 0; i < nodesCount; i++) {
                 TransportAddress transportAddress = buildNewFakeTransportAddress();

+ 5 - 1
test/framework/src/main/java/org/elasticsearch/transport/MockTransportClient.java

@@ -37,7 +37,11 @@ public class MockTransportClient extends TransportClient {
     }
 
     public MockTransportClient(Settings settings, Collection<Class<? extends Plugin>> plugins) {
-        super(settings, DEFAULT_SETTINGS, addMockTransportIfMissing(plugins));
+        this(settings, addMockTransportIfMissing(plugins), null);
+    }
+
+    public MockTransportClient(Settings settings, Collection<Class<? extends Plugin>> plugins, HostFailureListener listener) {
+        super(settings, DEFAULT_SETTINGS, addMockTransportIfMissing(plugins), listener);
     }
 
     private static Collection<Class<? extends Plugin>> addMockTransportIfMissing(Collection<Class<? extends Plugin>> plugins) {