浏览代码

Ensure remote cluster is connected before fetching `_field_caps` (#24845)

If a cluster disconnects and comes back up we should ensure that
we connected to the cluster before we fire the requests.

Closes #24763
Simon Willnauer 8 年之前
父节点
当前提交
ac6a6d6fe8

+ 39 - 32
core/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java

@@ -118,38 +118,45 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
             for (Map.Entry<String, OriginalIndices> remoteIndices : remoteClusterIndices.entrySet()) {
                 String clusterAlias = remoteIndices.getKey();
                 OriginalIndices originalIndices = remoteIndices.getValue();
-                Transport.Connection connection = remoteClusterService.getConnection(remoteIndices.getKey());
-                FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest();
-                remoteRequest.setMergeResults(false); // we need to merge on this node
-                remoteRequest.indicesOptions(originalIndices.indicesOptions());
-                remoteRequest.indices(originalIndices.indices());
-                remoteRequest.fields(request.fields());
-                transportService.sendRequest(connection, FieldCapabilitiesAction.NAME, remoteRequest, TransportRequestOptions.EMPTY,
-                    new TransportResponseHandler<FieldCapabilitiesResponse>() {
-                    @Override
-                    public FieldCapabilitiesResponse newInstance() {
-                        return new FieldCapabilitiesResponse();
-                    }
-
-                    @Override
-                    public void handleResponse(FieldCapabilitiesResponse response) {
-                        for (FieldCapabilitiesIndexResponse res : response.getIndexResponses()) {
-                            indexResponses.add(new FieldCapabilitiesIndexResponse(RemoteClusterAware.buildRemoteIndexName(clusterAlias,
-                                res.getIndexName()), res.get()));
-                        }
-                        onResponse.run();
-                    }
-
-                    @Override
-                    public void handleException(TransportException exp) {
-                        onResponse.run();
-                    }
-
-                    @Override
-                    public String executor() {
-                        return ThreadPool.Names.SAME;
-                    }
-                });
+                // if we are connected this is basically a no-op, if we are not we try to connect in parallel in a non-blocking fashion
+                remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(v -> {
+                    Transport.Connection connection = remoteClusterService.getConnection(clusterAlias);
+                    FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest();
+                    remoteRequest.setMergeResults(false); // we need to merge on this node
+                    remoteRequest.indicesOptions(originalIndices.indicesOptions());
+                    remoteRequest.indices(originalIndices.indices());
+                    remoteRequest.fields(request.fields());
+                    transportService.sendRequest(connection, FieldCapabilitiesAction.NAME, remoteRequest, TransportRequestOptions.EMPTY,
+                        new TransportResponseHandler<FieldCapabilitiesResponse>() {
+
+                            @Override
+                            public FieldCapabilitiesResponse newInstance() {
+                                return new FieldCapabilitiesResponse();
+                            }
+
+                            @Override
+                            public void handleResponse(FieldCapabilitiesResponse response) {
+                                try {
+                                    for (FieldCapabilitiesIndexResponse res : response.getIndexResponses()) {
+                                        indexResponses.add(new FieldCapabilitiesIndexResponse(RemoteClusterAware.
+                                            buildRemoteIndexName(clusterAlias, res.getIndexName()), res.get()));
+                                    }
+                                } finally {
+                                    onResponse.run();
+                                }
+                            }
+
+                            @Override
+                            public void handleException(TransportException exp) {
+                                onResponse.run();
+                            }
+
+                            @Override
+                            public String executor() {
+                                return ThreadPool.Names.SAME;
+                            }
+                        });
+                }, e -> onResponse.run()));
             }
 
         }

+ 13 - 1
core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

@@ -160,12 +160,24 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
             // we can't proceed with a search on a cluster level.
             // in the future we might want to just skip the remote nodes in such a case but that can already be implemented on the caller
             // end since they provide the listener.
-            connectHandler.connect(ActionListener.wrap((x) -> fetchShardsInternal(searchRequest, listener), listener::onFailure));
+            ensureConnected(ActionListener.wrap((x) -> fetchShardsInternal(searchRequest, listener), listener::onFailure));
         } else {
             fetchShardsInternal(searchRequest, listener);
         }
     }
 
+    /**
+     * Ensures that this cluster is connected. If the cluster is connected this operation
+     * will invoke the listener immediately.
+     */
+    public void ensureConnected(ActionListener<Void> voidActionListener) {
+        if (connectedNodes.isEmpty()) {
+            connectHandler.connect(voidActionListener);
+        } else {
+            voidActionListener.onResponse(null);
+        }
+    }
+
     private void fetchShardsInternal(ClusterSearchShardsRequest searchShardsRequest,
                                      final ActionListener<ClusterSearchShardsResponse> listener) {
         final DiscoveryNode node = nodeSupplier.get();

+ 14 - 0
core/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

@@ -22,6 +22,7 @@ import org.apache.logging.log4j.util.Supplier;
 import org.apache.lucene.util.IOUtils;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.LatchedActionListener;
 import org.elasticsearch.action.OriginalIndices;
 import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
 import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
@@ -46,6 +47,7 @@ import org.elasticsearch.search.internal.AliasFilter;
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -265,6 +267,18 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
         return connection.getConnection(node);
     }
 
+    /**
+     * Ensures that the given cluster alias is connected. If the cluster is connected this operation
+     * will invoke the listener immediately.
+     */
+    public void ensureConnected(String clusterAlias, ActionListener<Void> listener) {
+        RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterAlias);
+        if (remoteClusterConnection == null) {
+            throw new IllegalArgumentException("no such remote cluster: " + clusterAlias);
+        }
+        remoteClusterConnection.ensureConnected(listener);
+    }
+
     public Transport.Connection getConnection(String cluster) {
         RemoteClusterConnection connection = remoteClusters.get(cluster);
         if (connection == null) {

+ 55 - 0
core/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java

@@ -22,6 +22,7 @@ import org.apache.lucene.store.AlreadyClosedException;
 import org.elasticsearch.Build;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.LatchedActionListener;
 import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
 import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
 import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
@@ -730,4 +731,58 @@ public class RemoteClusterConnectionTests extends ESTestCase {
         }
         return statsRef.get();
     }
+
+    public void testEnsureConnected() throws IOException, InterruptedException {
+        List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
+        try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
+            MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
+            DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
+            DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode();
+            knownNodes.add(seedTransport.getLocalDiscoNode());
+            knownNodes.add(discoverableTransport.getLocalDiscoNode());
+            Collections.shuffle(knownNodes, random());
+
+            try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
+                service.start();
+                service.acceptIncomingRequests();
+                try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
+                    Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
+                    assertFalse(service.nodeConnected(seedNode));
+                    assertFalse(service.nodeConnected(discoverableNode));
+                    assertTrue(connection.assertNoRunningConnections());
+                    CountDownLatch latch = new CountDownLatch(1);
+                    connection.ensureConnected(new LatchedActionListener<>(new ActionListener<Void>() {
+                        @Override
+                        public void onResponse(Void aVoid) {
+                        }
+
+                        @Override
+                        public void onFailure(Exception e) {
+                            throw new AssertionError(e);
+                        }
+                    }, latch));
+                    latch.await();
+                    assertTrue(service.nodeConnected(seedNode));
+                    assertTrue(service.nodeConnected(discoverableNode));
+                    assertTrue(connection.assertNoRunningConnections());
+
+                    // exec again we are already connected
+                    connection.ensureConnected(new LatchedActionListener<>(new ActionListener<Void>() {
+                        @Override
+                        public void onResponse(Void aVoid) {
+                        }
+
+                        @Override
+                        public void onFailure(Exception e) {
+                            throw new AssertionError(e);
+                        }
+                    }, latch));
+                    latch.await();
+                    assertTrue(service.nodeConnected(seedNode));
+                    assertTrue(service.nodeConnected(discoverableNode));
+                    assertTrue(connection.assertNoRunningConnections());
+                }
+            }
+        }
+    }
 }

+ 38 - 0
core/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java

@@ -189,6 +189,44 @@ public class RemoteClusterServiceTests extends ESTestCase {
         }
     }
 
+    public void testEnsureConnected() throws IOException {
+        List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
+        try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT);
+             MockTransportService otherSeedTransport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) {
+            DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
+            DiscoveryNode otherSeedNode = otherSeedTransport.getLocalDiscoNode();
+            knownNodes.add(seedTransport.getLocalDiscoNode());
+            knownNodes.add(otherSeedTransport.getLocalDiscoNode());
+            Collections.shuffle(knownNodes, random());
+
+            try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool,
+                null)) {
+                transportService.start();
+                transportService.acceptIncomingRequests();
+                Settings.Builder builder = Settings.builder();
+                builder.putArray("search.remote.cluster_1.seeds", seedNode.getAddress().toString());
+                builder.putArray("search.remote.cluster_2.seeds", otherSeedNode.getAddress().toString());
+                try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, transportService)) {
+                    assertFalse(service.isCrossClusterSearchEnabled());
+                    service.initializeRemoteClusters();
+                    assertFalse(service.isCrossClusterSearchEnabled());
+                    service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().address()));
+                    assertTrue(service.isCrossClusterSearchEnabled());
+                    assertTrue(service.isRemoteClusterRegistered("cluster_1"));
+                    service.updateRemoteCluster("cluster_2", Collections.singletonList(otherSeedNode.getAddress().address()));
+                    assertTrue(service.isCrossClusterSearchEnabled());
+                    assertTrue(service.isRemoteClusterRegistered("cluster_1"));
+                    assertTrue(service.isRemoteClusterRegistered("cluster_2"));
+                    service.updateRemoteCluster("cluster_2", Collections.emptyList());
+                    assertFalse(service.isRemoteClusterRegistered("cluster_2"));
+                    IllegalArgumentException iae = expectThrows(IllegalArgumentException.class,
+                        () -> service.updateRemoteCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Collections.emptyList()));
+                    assertEquals("remote clusters must not have the empty string as its key", iae.getMessage());
+                }
+            }
+        }
+    }
+
     public void testRemoteNodeAttribute() throws IOException, InterruptedException {
         final Settings settings =
                 Settings.builder().put("search.remote.node.attr", "gateway").build();