Browse Source

Use remote client in TransportFieldCapsAction (#30838)

We now have a remote cluster client exposed which can
talk to a given remote cluster and manages reconnects etc.
This makes code more readable than using the transport layer directly.
Simon Willnauer 7 years ago
parent
commit
8bbfdf1f45

+ 14 - 46
server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java

@@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.OriginalIndices;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.HandledTransportAction;
+import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.service.ClusterService;
@@ -33,10 +34,6 @@ import org.elasticsearch.common.util.concurrent.CountDown;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.RemoteClusterAware;
 import org.elasticsearch.transport.RemoteClusterService;
-import org.elasticsearch.transport.Transport;
-import org.elasticsearch.transport.TransportException;
-import org.elasticsearch.transport.TransportRequestOptions;
-import org.elasticsearch.transport.TransportResponseHandler;
 import org.elasticsearch.transport.TransportService;
 
 import java.util.ArrayList;
@@ -49,7 +46,6 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
     private final ClusterService clusterService;
     private final TransportFieldCapabilitiesIndexAction shardAction;
     private final RemoteClusterService remoteClusterService;
-    private final TransportService transportService;
 
     @Inject
     public TransportFieldCapabilitiesAction(Settings settings, TransportService transportService,
@@ -62,7 +58,6 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
             actionFilters, indexNameExpressionResolver, FieldCapabilitiesRequest::new);
         this.clusterService = clusterService;
         this.remoteClusterService = transportService.getRemoteClusterService();
-        this.transportService = transportService;
         this.shardAction = shardAction;
     }
 
@@ -118,47 +113,20 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
             for (Map.Entry<String, OriginalIndices> remoteIndices : remoteClusterIndices.entrySet()) {
                 String clusterAlias = remoteIndices.getKey();
                 OriginalIndices originalIndices = remoteIndices.getValue();
-                // if we are connected this is basically a no-op, if we are not we try to connect in parallel in a non-blocking fashion
-                remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(v -> {
-                    Transport.Connection connection = remoteClusterService.getConnection(clusterAlias);
-                    FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest();
-                    remoteRequest.setMergeResults(false); // we need to merge on this node
-                    remoteRequest.indicesOptions(originalIndices.indicesOptions());
-                    remoteRequest.indices(originalIndices.indices());
-                    remoteRequest.fields(request.fields());
-                    transportService.sendRequest(connection, FieldCapabilitiesAction.NAME, remoteRequest, TransportRequestOptions.EMPTY,
-                        new TransportResponseHandler<FieldCapabilitiesResponse>() {
-
-                            @Override
-                            public FieldCapabilitiesResponse newInstance() {
-                                return new FieldCapabilitiesResponse();
-                            }
-
-                            @Override
-                            public void handleResponse(FieldCapabilitiesResponse response) {
-                                try {
-                                    for (FieldCapabilitiesIndexResponse res : response.getIndexResponses()) {
-                                        indexResponses.add(new FieldCapabilitiesIndexResponse(RemoteClusterAware.
-                                            buildRemoteIndexName(clusterAlias, res.getIndexName()), res.get()));
-                                    }
-                                } finally {
-                                    onResponse.run();
-                                }
-                            }
-
-                            @Override
-                            public void handleException(TransportException exp) {
-                                onResponse.run();
-                            }
-
-                            @Override
-                            public String executor() {
-                                return ThreadPool.Names.SAME;
-                            }
-                        });
-                }, e -> onResponse.run()));
+                Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias);
+                FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest();
+                remoteRequest.setMergeResults(false); // we need to merge on this node
+                remoteRequest.indicesOptions(originalIndices.indicesOptions());
+                remoteRequest.indices(originalIndices.indices());
+                remoteRequest.fields(request.fields());
+                remoteClusterClient.fieldCaps(remoteRequest,  ActionListener.wrap(response -> {
+                    for (FieldCapabilitiesIndexResponse res : response.getIndexResponses()) {
+                        indexResponses.add(new FieldCapabilitiesIndexResponse(RemoteClusterAware.
+                            buildRemoteIndexName(clusterAlias, res.getIndexName()), res.get()));
+                    }
+                    onResponse.run();
+                }, failure -> onResponse.run()));
             }
-
         }
     }