Browse Source

Add cross-cluster search remote cluster info API (#23969)

This commit adds an API to discover information like seed nodes,
http addresses and connection status of a configured remote cluster.

Closes #23925
Simon Willnauer 8 years ago
parent
commit
f22e0dc30b

+ 4 - 1
core/src/main/java/org/elasticsearch/action/ActionModule.java

@@ -173,6 +173,7 @@ import org.elasticsearch.action.main.MainAction;
 import org.elasticsearch.action.main.TransportMainAction;
 import org.elasticsearch.action.search.ClearScrollAction;
 import org.elasticsearch.action.search.MultiSearchAction;
+import org.elasticsearch.action.search.RemoteClusterService;
 import org.elasticsearch.action.search.SearchAction;
 import org.elasticsearch.action.search.SearchScrollAction;
 import org.elasticsearch.action.search.TransportClearScrollAction;
@@ -235,6 +236,7 @@ import org.elasticsearch.rest.action.admin.cluster.RestNodesStatsAction;
 import org.elasticsearch.rest.action.admin.cluster.RestPendingClusterTasksAction;
 import org.elasticsearch.rest.action.admin.cluster.RestPutRepositoryAction;
 import org.elasticsearch.rest.action.admin.cluster.RestPutStoredScriptAction;
+import org.elasticsearch.rest.action.admin.cluster.RestRemoteClusterInfoAction;
 import org.elasticsearch.rest.action.admin.cluster.RestRestoreSnapshotAction;
 import org.elasticsearch.rest.action.admin.cluster.RestSnapshotsStatusAction;
 import org.elasticsearch.rest.action.admin.cluster.RestVerifyRepositoryAction;
@@ -500,7 +502,7 @@ public class ActionModule extends AbstractModule {
         return unmodifiableList(actionPlugins.stream().flatMap(p -> p.getActionFilters().stream()).collect(Collectors.toList()));
     }
 
-    public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
+    public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster, RemoteClusterService remoteClusterService) {
         List<AbstractCatAction> catActions = new ArrayList<>();
         Consumer<RestHandler> registerHandler = a -> {
             if (a instanceof AbstractCatAction) {
@@ -509,6 +511,7 @@ public class ActionModule extends AbstractModule {
         };
         registerHandler.accept(new RestMainAction(settings, restController));
         registerHandler.accept(new RestNodesInfoAction(settings, restController, settingsFilter));
+        registerHandler.accept(new RestRemoteClusterInfoAction(settings, restController, remoteClusterService));
         registerHandler.accept(new RestNodesStatsAction(settings, restController));
         registerHandler.accept(new RestNodesHotThreadsAction(settings, restController));
         registerHandler.accept(new RestClusterAllocationExplainAction(settings, restController));

+ 74 - 0
core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java

@@ -23,6 +23,10 @@ import org.apache.logging.log4j.util.Supplier;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.util.IOUtils;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
+import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
+import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
+import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
 import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction;
 import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
 import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
@@ -33,6 +37,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.common.component.AbstractComponent;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.common.util.CancellableThreads;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
@@ -54,8 +59,10 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
@@ -65,6 +72,7 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 /**
  * Represents a connection to a single remote cluster. In contrast to a local cluster a remote cluster is not joined such that the
@@ -521,4 +529,70 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
         return connectedNodes.contains(node);
     }
 
+
+    /**
+     * Fetches connection info for this connection
+     */
+    public void getConnectionInfo(ActionListener<RemoteConnectionInfo> listener) {
+        final Optional<DiscoveryNode> anyNode = connectedNodes.stream().findAny();
+        if (anyNode.isPresent() == false) {
+            // not connected we return immediately
+            RemoteConnectionInfo remoteConnectionStats = new RemoteConnectionInfo(clusterAlias,
+                Collections.emptyList(), Collections.emptyList(), maxNumRemoteConnections, 0,
+                RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings));
+            listener.onResponse(remoteConnectionStats);
+        } else {
+            NodesInfoRequest request = new NodesInfoRequest();
+            request.clear();
+            request.http(true);
+
+            transportService.sendRequest(anyNode.get(), NodesInfoAction.NAME, request, new TransportResponseHandler<NodesInfoResponse>() {
+                @Override
+                public NodesInfoResponse newInstance() {
+                    return new NodesInfoResponse();
+                }
+
+                @Override
+                public void handleResponse(NodesInfoResponse response) {
+                    Collection<TransportAddress> httpAddresses = new HashSet<>();
+                    for (NodeInfo info : response.getNodes()) {
+                        if (connectedNodes.contains(info.getNode()) && info.getHttp() != null) {
+                            httpAddresses.add(info.getHttp().getAddress().publishAddress());
+                        }
+                    }
+
+                    if (httpAddresses.size() < maxNumRemoteConnections) {
+                        // just in case non of the connected nodes have http enabled we get other http enabled nodes instead.
+                        for (NodeInfo info : response.getNodes()) {
+                            if (nodePredicate.test(info.getNode()) && info.getHttp() != null) {
+                                httpAddresses.add(info.getHttp().getAddress().publishAddress());
+                            }
+                            if (httpAddresses.size() == maxNumRemoteConnections) {
+                                break; // once we have enough return...
+                            }
+                        }
+                    }
+                    RemoteConnectionInfo remoteConnectionInfo = new RemoteConnectionInfo(clusterAlias,
+                        seedNodes.stream().map(n -> n.getAddress()).collect(Collectors.toSet()), httpAddresses, maxNumRemoteConnections,
+                        connectedNodes.size(), RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings));
+                    listener.onResponse(remoteConnectionInfo);
+                }
+
+                @Override
+                public void handleException(TransportException exp) {
+                    listener.onFailure(exp);
+                }
+
+                @Override
+                public String executor() {
+                    return ThreadPool.Names.SAME;
+                }
+            });
+        }
+
+    }
+
+    int getNumNodesConnected() {
+        return connectedNodes.size();
+    }
 }

+ 15 - 1
core/src/main/java/org/elasticsearch/action/search/RemoteClusterService.java

@@ -24,9 +24,9 @@ import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
 import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
+import org.elasticsearch.action.support.GroupedActionListener;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.routing.PlainShardIterator;
 import org.elasticsearch.cluster.routing.ShardIterator;
 import org.elasticsearch.common.Booleans;
@@ -51,6 +51,7 @@ import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -413,4 +414,17 @@ public final class RemoteClusterService extends AbstractComponent implements Clo
     public void close() throws IOException {
         IOUtils.close(remoteClusters.values());
     }
+
+    public void getRemoteConnectionInfos(ActionListener<Collection<RemoteConnectionInfo>> listener) {
+        final Map<String, RemoteClusterConnection> remoteClusters = this.remoteClusters;
+        if (remoteClusters.isEmpty()) {
+            listener.onResponse(Collections.emptyList());
+        } else {
+            final GroupedActionListener<RemoteConnectionInfo> actionListener = new GroupedActionListener<>(listener,
+                remoteClusters.size(), Collections.emptyList());
+            for (RemoteClusterConnection connection : remoteClusters.values()) {
+                connection.getConnectionInfo(actionListener);
+            }
+        }
+    }
 }

+ 75 - 0
core/src/main/java/org/elasticsearch/action/search/RemoteConnectionInfo.java

@@ -0,0 +1,75 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.action.search;
+
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * This class encapsulates all remote cluster information to be rendered on
+ * <tt>_remote/info</tt> requests.
+ */
+public final class RemoteConnectionInfo implements ToXContent {
+    final Collection<TransportAddress> seedNodes;
+    final Collection<TransportAddress> httpAddresses;
+    final int connectionsPerCluster;
+    final TimeValue initialConnectionTimeout;
+    final int numNodesConnected;
+    final String clusterAlias;
+
+    RemoteConnectionInfo(String clusterAlias, Collection<TransportAddress> seedNodes,
+                         Collection<TransportAddress> httpAddresses,
+                         int connectionsPerCluster, int numNodesConnected,
+                         TimeValue initialConnectionTimeout) {
+        this.clusterAlias = clusterAlias;
+        this.seedNodes = seedNodes;
+        this.httpAddresses = httpAddresses;
+        this.connectionsPerCluster = connectionsPerCluster;
+        this.numNodesConnected = numNodesConnected;
+        this.initialConnectionTimeout = initialConnectionTimeout;
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject(clusterAlias);
+        {
+            builder.startArray("seeds");
+            for (TransportAddress addr : seedNodes) {
+                builder.value(addr.toString());
+            }
+            builder.endArray();
+            builder.startArray("http_addresses");
+            for (TransportAddress addr : httpAddresses) {
+                builder.value(addr.toString());
+            }
+            builder.endArray();
+            builder.field("connected", numNodesConnected > 0);
+            builder.field("num_nodes_connected", numNodesConnected);
+            builder.field("max_connections_per_cluster", connectionsPerCluster);
+            builder.field("initial_connect_timeout", initialConnectionTimeout);
+        }
+        builder.endObject();
+        return builder;
+    }
+}

+ 81 - 0
core/src/main/java/org/elasticsearch/action/support/GroupedActionListener.java

@@ -0,0 +1,81 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.action.support;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.common.util.concurrent.AtomicArray;
+import org.elasticsearch.common.util.concurrent.CountDown;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * An action listener that delegates it's results to another listener once
+ * it has received one or more failures or N results. This allows synchronous
+ * tasks to be forked off in a loop with the same listener and respond to a
+ * higher level listener once all tasks responded.
+ */
+public final class GroupedActionListener<T> implements ActionListener<T> {
+    private final CountDown countDown;
+    private final AtomicInteger pos = new AtomicInteger();
+    private final AtomicArray<T> roles;
+    private final ActionListener<Collection<T>> delegate;
+    private final Collection<T> defaults;
+    private final AtomicReference<Exception> failure = new AtomicReference<>();
+
+    /**
+     * Creates a new listener
+     * @param delegate the delegate listener
+     * @param groupSize the group size
+     */
+    public GroupedActionListener(ActionListener<Collection<T>> delegate, int groupSize,
+                                 Collection<T> defaults) {
+        roles = new AtomicArray<>(groupSize);
+        countDown = new CountDown(groupSize);
+        this.delegate = delegate;
+        this.defaults = defaults;
+    }
+
+    @Override
+    public void onResponse(T element) {
+        roles.set(pos.incrementAndGet() - 1, element);
+        if (countDown.countDown()) {
+            if (failure.get() != null) {
+                delegate.onFailure(failure.get());
+            } else {
+                List<T> collect = this.roles.asList();
+                collect.addAll(defaults);
+                delegate.onResponse(Collections.unmodifiableList(collect));
+            }
+        }
+    }
+
+    @Override
+    public void onFailure(Exception e) {
+        if (failure.compareAndSet(null, e) == false) {
+            failure.get().addSuppressed(e);
+        }
+        if (countDown.countDown()) {
+            delegate.onFailure(failure.get());
+        }
+    }
+}

+ 4 - 3
core/src/main/java/org/elasticsearch/node/Node.java

@@ -406,6 +406,8 @@ public class Node implements Closeable {
             final Transport transport = networkModule.getTransportSupplier().get();
             final TransportService transportService = newTransportService(settings, transport, threadPool,
                 networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings());
+            final SearchTransportService searchTransportService =  new SearchTransportService(settings,
+                settingsModule.getClusterSettings(), transportService);
             final Consumer<Binder> httpBind;
             final HttpServerTransport httpServerTransport;
             if (networkModule.isHttpEnabled()) {
@@ -447,8 +449,7 @@ public class Node implements Closeable {
                     b.bind(IndicesService.class).toInstance(indicesService);
                     b.bind(SearchService.class).toInstance(newSearchService(clusterService, indicesService,
                         threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase()));
-                    b.bind(SearchTransportService.class).toInstance(new SearchTransportService(settings,
-                            settingsModule.getClusterSettings(), transportService));
+                    b.bind(SearchTransportService.class).toInstance(searchTransportService);
                     b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(settings, bigArrays,
                             scriptModule.getScriptService()));
                     b.bind(Transport.class).toInstance(transport);
@@ -485,7 +486,7 @@ public class Node implements Closeable {
 
             if (NetworkModule.HTTP_ENABLED.get(settings)) {
                 logger.debug("initializing HTTP handlers ...");
-                actionModule.initRestHandlers(() -> clusterService.state().nodes());
+                actionModule.initRestHandlers(() -> clusterService.state().nodes(), searchTransportService.getRemoteClusterService());
             }
             logger.info("initialized");
 

+ 74 - 0
core/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestRemoteClusterInfoAction.java

@@ -0,0 +1,74 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.rest.action.admin.cluster;
+
+import org.elasticsearch.action.search.RemoteClusterService;
+import org.elasticsearch.action.search.RemoteConnectionInfo;
+import org.elasticsearch.client.node.NodeClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.BytesRestResponse;
+import org.elasticsearch.rest.RestController;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.RestResponse;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.rest.action.RestResponseListener;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.elasticsearch.rest.RestRequest.Method.GET;
+
+public final class RestRemoteClusterInfoAction extends BaseRestHandler {
+
+    private final RemoteClusterService remoteClusterService;
+
+    public RestRemoteClusterInfoAction(Settings settings, RestController controller,
+                                       RemoteClusterService remoteClusterService) {
+        super(settings);
+        controller.registerHandler(GET, "_remote/info", this);
+        this.remoteClusterService = remoteClusterService;
+    }
+
+    @Override
+    public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client)
+        throws IOException {
+        return channel -> remoteClusterService.getRemoteConnectionInfos(
+            new RestResponseListener<Collection<RemoteConnectionInfo>>(channel) {
+            @Override
+            public RestResponse buildResponse(
+                Collection<RemoteConnectionInfo> remoteConnectionInfos) throws Exception {
+                try (XContentBuilder xContentBuilder = channel.newBuilder()) {
+                    xContentBuilder.startObject();
+                    for (RemoteConnectionInfo info : remoteConnectionInfos) {
+                        info.toXContent(xContentBuilder, request);
+                    }
+                    xContentBuilder.endObject();
+                    return new BytesRestResponse(RestStatus.OK, xContentBuilder);
+                }
+            }
+        });
+    }
+    @Override
+    public boolean canTripCircuitBreaker() {
+        return false;
+    }
+}

+ 3 - 3
core/src/test/java/org/elasticsearch/action/ActionModuleTests.java

@@ -113,7 +113,7 @@ public class ActionModuleTests extends ESTestCase {
         ActionModule actionModule = new ActionModule(false, settings.getSettings(), new IndexNameExpressionResolver(Settings.EMPTY),
                 settings.getIndexScopedSettings(), settings.getClusterSettings(), settings.getSettingsFilter(), null, emptyList(), null,
                 null);
-        actionModule.initRestHandlers(null);
+        actionModule.initRestHandlers(null, null);
         // At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail
         Exception e = expectThrows(IllegalArgumentException.class, () ->
             actionModule.getRestController().registerHandler(Method.GET, "/", null));
@@ -135,7 +135,7 @@ public class ActionModuleTests extends ESTestCase {
             ActionModule actionModule = new ActionModule(false, settings.getSettings(), new IndexNameExpressionResolver(Settings.EMPTY),
                     settings.getIndexScopedSettings(), settings.getClusterSettings(), settings.getSettingsFilter(), threadPool,
                     singletonList(dupsMainAction), null, null);
-            Exception e = expectThrows(IllegalArgumentException.class, () -> actionModule.initRestHandlers(null));
+            Exception e = expectThrows(IllegalArgumentException.class, () -> actionModule.initRestHandlers(null, null));
             assertThat(e.getMessage(), startsWith("Path [/] already has a value [" + RestMainAction.class.getName()));
         } finally {
             threadPool.shutdown();
@@ -166,7 +166,7 @@ public class ActionModuleTests extends ESTestCase {
             ActionModule actionModule = new ActionModule(false, settings.getSettings(), new IndexNameExpressionResolver(Settings.EMPTY),
                     settings.getIndexScopedSettings(), settings.getClusterSettings(), settings.getSettingsFilter(), threadPool,
                     singletonList(registersFakeHandler), null, null);
-            actionModule.initRestHandlers(null);
+            actionModule.initRestHandlers(null, null);
             // At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail
             Exception e = expectThrows(IllegalArgumentException.class, () ->
                 actionModule.getRestController().registerHandler(Method.GET, "/_dummy", null));

+ 130 - 4
core/src/test/java/org/elasticsearch/action/search/RemoteClusterConnectionTests.java

@@ -19,8 +19,13 @@
 package org.elasticsearch.action.search;
 
 import org.apache.lucene.store.AlreadyClosedException;
+import org.elasticsearch.Build;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
+import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
+import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
+import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
 import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction;
 import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
 import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
@@ -34,24 +39,28 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.common.SuppressForbidden;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.BoundTransportAddress;
 import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.CancellableThreads;
-import org.elasticsearch.discovery.Discovery;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.http.HttpInfo;
 import org.elasticsearch.mocksocket.MockServerSocket;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.transport.MockTransportService;
 import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.RemoteTransportException;
 import org.elasticsearch.transport.TransportConnectionListener;
+import org.elasticsearch.transport.TransportService;
 
 import java.io.IOException;
-import java.io.UncheckedIOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
-import java.net.UnknownHostException;
-import java.nio.channels.AlreadyConnectedException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -519,4 +528,121 @@ public class RemoteClusterConnectionTests extends ESTestCase {
             }
         }
     }
+
+    private static void installNodeStatsHandler(TransportService service, DiscoveryNode...nodes) {
+        service.registerRequestHandler(NodesInfoAction.NAME, NodesInfoRequest::new, ThreadPool.Names.SAME, false, false,
+            (request, channel) -> {
+                List<NodeInfo> nodeInfos = new ArrayList<>();
+                int port = 80;
+                for (DiscoveryNode node : nodes) {
+                    HttpInfo http = new HttpInfo(new BoundTransportAddress(new TransportAddress[]{node.getAddress()},
+                        new TransportAddress(node.getAddress().address().getAddress(), port++)), 100);
+                    nodeInfos.add(new NodeInfo(node.getVersion(), Build.CURRENT, node, null, null, null, null, null, null, http, null,
+                        null, null));
+                }
+                channel.sendResponse(new NodesInfoResponse(ClusterName.DEFAULT, nodeInfos, Collections.emptyList()));
+            });
+
+    }
+
+    public void testGetConnectionInfo() throws Exception {
+        List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
+        try (MockTransportService transport1 = startTransport("seed_node", knownNodes, Version.CURRENT);
+             MockTransportService transport2 = startTransport("seed_node_1", knownNodes, Version.CURRENT);
+             MockTransportService transport3 = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
+            DiscoveryNode node1 = transport1.getLocalDiscoNode();
+            DiscoveryNode node2 = transport3.getLocalDiscoNode();
+            DiscoveryNode node3 = transport2.getLocalDiscoNode();
+            knownNodes.add(transport1.getLocalDiscoNode());
+            knownNodes.add(transport3.getLocalDiscoNode());
+            knownNodes.add(transport2.getLocalDiscoNode());
+            Collections.shuffle(knownNodes, random());
+            List<DiscoveryNode> seedNodes = Arrays.asList(node3, node1, node2);
+            Collections.shuffle(seedNodes, random());
+
+            try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
+                service.start();
+                service.acceptIncomingRequests();
+                int maxNumConnections = randomIntBetween(1, 5);
+                try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
+                    seedNodes, service, maxNumConnections, n -> true)) {
+                    // test no nodes connected
+                    RemoteConnectionInfo remoteConnectionInfo = getRemoteConnectionInfo(connection);
+                    assertNotNull(remoteConnectionInfo);
+                    assertEquals(0, remoteConnectionInfo.numNodesConnected);
+                    assertEquals(0, remoteConnectionInfo.seedNodes.size());
+                    assertEquals(0, remoteConnectionInfo.httpAddresses.size());
+                    assertEquals(maxNumConnections, remoteConnectionInfo.connectionsPerCluster);
+                    assertEquals("test-cluster", remoteConnectionInfo.clusterAlias);
+                    updateSeedNodes(connection, seedNodes);
+                    expectThrows(RemoteTransportException.class, () -> getRemoteConnectionInfo(connection));
+
+                    for (MockTransportService s : Arrays.asList(transport1, transport2, transport3)) {
+                        installNodeStatsHandler(s, node1, node2, node3);
+                    }
+
+                    remoteConnectionInfo = getRemoteConnectionInfo(connection);
+                    assertNotNull(remoteConnectionInfo);
+                    assertEquals(connection.getNumNodesConnected(), remoteConnectionInfo.numNodesConnected);
+                    assertEquals(Math.min(3, maxNumConnections), connection.getNumNodesConnected());
+                    assertEquals(3, remoteConnectionInfo.seedNodes.size());
+                    assertEquals(remoteConnectionInfo.httpAddresses.size(), Math.min(3, maxNumConnections));
+                    assertEquals(maxNumConnections, remoteConnectionInfo.connectionsPerCluster);
+                    assertEquals("test-cluster", remoteConnectionInfo.clusterAlias);
+                    for (TransportAddress address : remoteConnectionInfo.httpAddresses) {
+                        assertTrue("port range mismatch: " + address.getPort(), address.getPort() >= 80 && address.getPort() <= 90);
+                    }
+                }
+            }
+        }
+    }
+
+    public void testRenderConnectionInfoXContent() throws IOException {
+        RemoteConnectionInfo stats = new RemoteConnectionInfo("test_cluster",
+            Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,1)),
+            Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,80)),
+            4, 3, TimeValue.timeValueMinutes(30));
+        XContentBuilder builder = XContentFactory.jsonBuilder();
+        builder.startObject();
+        stats.toXContent(builder, null);
+        builder.endObject();
+        assertEquals("{\"test_cluster\":{\"seeds\":[\"0.0.0.0:1\"],\"http_addresses\":[\"0.0.0.0:80\"],\"connected\":true," +
+            "\"num_nodes_connected\":3,\"max_connections_per_cluster\":4,\"initial_connect_timeout\":\"30m\"}}", builder.string());
+
+        stats = new RemoteConnectionInfo("some_other_cluster",
+            Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,1), new TransportAddress(TransportAddress.META_ADDRESS,2)),
+            Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,80), new TransportAddress(TransportAddress.META_ADDRESS,81)),
+            2, 0, TimeValue.timeValueSeconds(30));
+        builder = XContentFactory.jsonBuilder();
+        builder.startObject();
+        stats.toXContent(builder, null);
+        builder.endObject();
+        assertEquals("{\"some_other_cluster\":{\"seeds\":[\"0.0.0.0:1\",\"0.0.0.0:2\"],\"http_addresses\":[\"0.0.0.0:80\",\"0.0.0.0:81\"],"
+                + "\"connected\":false,\"num_nodes_connected\":0,\"max_connections_per_cluster\":2,\"initial_connect_timeout\":\"30s\"}}",
+            builder.string());
+    }
+
+    private RemoteConnectionInfo getRemoteConnectionInfo(RemoteClusterConnection connection) throws Exception {
+        AtomicReference<RemoteConnectionInfo> statsRef = new AtomicReference<>();
+        AtomicReference<Exception> exceptionRef = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);
+        connection.getConnectionInfo(new ActionListener<RemoteConnectionInfo>() {
+            @Override
+            public void onResponse(RemoteConnectionInfo remoteConnectionInfo) {
+                statsRef.set(remoteConnectionInfo);
+                latch.countDown();
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                exceptionRef.set(e);
+                latch.countDown();
+            }
+        });
+        latch.await();
+        if (exceptionRef.get() != null) {
+            throw exceptionRef.get();
+        }
+        return statsRef.get();
+    }
 }

+ 124 - 0
core/src/test/java/org/elasticsearch/action/support/GroupedActionListenerTests.java

@@ -0,0 +1,124 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.action.support;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.test.ESTestCase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class GroupedActionListenerTests extends ESTestCase {
+
+    public void testNotifications() throws InterruptedException {
+        AtomicReference<Collection<Integer>> resRef = new AtomicReference<>();
+        ActionListener<Collection<Integer>> result = new ActionListener<Collection<Integer>>() {
+            @Override
+            public void onResponse(Collection<Integer> integers) {
+                resRef.set(integers);
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                throw new AssertionError(e);
+            }
+        };
+        final int groupSize = randomIntBetween(10, 1000);
+        AtomicInteger count = new AtomicInteger();
+        Collection<Integer> defaults = randomBoolean() ? Collections.singletonList(-1) :
+            Collections.emptyList();
+        GroupedActionListener<Integer> listener = new GroupedActionListener<>(result, groupSize,
+            defaults);
+        int numThreads = randomIntBetween(2, 5);
+        Thread[] threads = new Thread[numThreads];
+        CyclicBarrier barrier = new CyclicBarrier(numThreads);
+        for (int i = 0; i < numThreads; i++) {
+            threads[i] = new Thread()  {
+                @Override
+                public void run() {
+                    try {
+                        barrier.await(10, TimeUnit.SECONDS);
+                    } catch (Exception e) {
+                        throw new AssertionError(e);
+                    }
+                    int c = 0;
+                    while((c = count.incrementAndGet()) <= groupSize) {
+                        listener.onResponse(c-1);
+                    }
+                }
+            };
+            threads[i].start();
+        }
+        for (Thread t : threads) {
+            t.join();
+        }
+        assertNotNull(resRef.get());
+        ArrayList<Integer> list = new ArrayList<>(resRef.get());
+        Collections.sort(list);
+        int expectedSize = groupSize + defaults.size();
+        assertEquals(expectedSize, resRef.get().size());
+        int expectedValue = defaults.isEmpty() ? 0 : -1;
+        for (int i = 0; i < expectedSize; i++) {
+            assertEquals(Integer.valueOf(expectedValue++), list.get(i));
+        }
+    }
+
+    public void testFailed() {
+        AtomicReference<Collection<Integer>> resRef = new AtomicReference<>();
+        AtomicReference<Exception> excRef = new AtomicReference<>();
+
+        ActionListener<Collection<Integer>> result = new ActionListener<Collection<Integer>>() {
+            @Override
+            public void onResponse(Collection<Integer> integers) {
+                resRef.set(integers);
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                excRef.set(e);
+            }
+        };
+        Collection<Integer> defaults = randomBoolean() ? Collections.singletonList(-1) :
+            Collections.emptyList();
+        int size = randomIntBetween(3, 4);
+        GroupedActionListener<Integer> listener = new GroupedActionListener<>(result, size,
+            defaults);
+        listener.onResponse(0);
+        IOException ioException = new IOException();
+        RuntimeException rtException = new RuntimeException();
+        listener.onFailure(rtException);
+        listener.onFailure(ioException);
+        if (size == 4) {
+            listener.onResponse(2);
+        }
+        assertNotNull(excRef.get());
+        assertEquals(rtException, excRef.get());
+        assertEquals(1, excRef.get().getSuppressed().length);
+        assertEquals(ioException, excRef.get().getSuppressed()[0]);
+        assertNull(resRef.get());
+        listener.onResponse(1);
+        assertNull(resRef.get());
+    }
+}

+ 35 - 0
docs/reference/cluster/remote-info.asciidoc

@@ -0,0 +1,35 @@
+[[cluster-remote-info]]
+== Remote Cluster Info
+
+The cluster remote info API allows to retrieve all of the configured
+remote cluster information.
+
+[source,js]
+----------------------------------
+GET /_remote/info
+----------------------------------
+// CONSOLE
+
+This command returns returns connection and endpoint information keyed by
+the configured remote cluster alias.
+
+[float]
+[[connection-info]]
+
+`seeds`::
+	The configured initial seed transport addresses of the remote cluster.
+
+`http_addresses`::
+	The published http addresses of all connected remote nodes.
+
+`connected`::
+	True if there is at least one connection to the remote cluster.
+
+`num_nodes_connected`::
+    The number of connected nodes in the remote cluster.
+
+`max_connection_per_cluster`::
+	The maximum number of connections maintained for the remote cluster.
+
+`initial_connect_timeout`::
+	The initial connect timeout for remote cluster connections.

+ 48 - 0
qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/20_info.yaml

@@ -0,0 +1,48 @@
+---
+"Fetch remote cluster info for existing cluster":
+
+  - do:
+      remote.info: {}
+  - match: { my_remote_cluster.connected: true }
+  - match: { my_remote_cluster.num_nodes_connected: 1}
+  - match: { my_remote_cluster.max_connections_per_cluster: 1}
+  - match: { my_remote_cluster.initial_connect_timeout: "30s" }
+  - is_true: my_remote_cluster.http_addresses.0
+
+---
+"Add transient remote cluster based on the preset cluster and check remote info":
+  - do:
+      cluster.get_settings:
+        include_defaults: true
+
+  - set: { defaults.search.remote.my_remote_cluster.seeds.0: remote_ip }
+
+  - do:
+      cluster.put_settings:
+        flat_settings: true
+        body:
+          transient:
+            search.remote.test_remote_cluster.seeds: $remote_ip
+
+  - match: {transient: {search.remote.test_remote_cluster.seeds: $remote_ip}}
+
+  - do:
+      remote.info: {}
+  - set: { my_remote_cluster.http_addresses.0: remote_http }
+  - match: { test_remote_cluster.http_addresses.0: $remote_http }
+
+  - match: { test_remote_cluster.connected: true }
+  - match: { my_remote_cluster.connected: true }
+
+  - match: { test_remote_cluster.seeds.0: $remote_ip }
+  - match: { my_remote_cluster.seeds.0: $remote_ip }
+
+  - match: { my_remote_cluster.num_nodes_connected: 1}
+  - match: { test_remote_cluster.num_nodes_connected: 1}
+
+  - match: { my_remote_cluster.max_connections_per_cluster: 1}
+  - match: { test_remote_cluster.max_connections_per_cluster: 1}
+
+  - match: { my_remote_cluster.initial_connect_timeout: "30s" }
+  - match: { test_remote_cluster.initial_connect_timeout: "30s" }
+

+ 12 - 0
rest-api-spec/src/main/resources/rest-api-spec/api/remote.info.json

@@ -0,0 +1,12 @@
+{
+  "remote.info": {
+    "documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/master/remote-info.html",
+    "methods": ["GET"],
+    "url": {
+      "path": "/_remote/info",
+      "paths": ["/_remote/info"],
+      "params": {}
+    },
+    "body": null
+  }
+}

+ 9 - 0
rest-api-spec/src/main/resources/rest-api-spec/test/remote.info/10_info.yaml

@@ -0,0 +1,9 @@
+---
+"Get an empty emote info":
+  - skip:
+      version: " - 5.99.99"
+      reason: this API doesn't exist in 5.x yet
+  - do:
+      remote.info: {}
+  - is_true: ''
+