Simon Willnauer 8 rokov pred
rodič
commit
0183b0c5a8

+ 86 - 3
core/src/main/java/org/elasticsearch/action/search/RemoteClusterService.java

@@ -18,18 +18,26 @@
  */
 package org.elasticsearch.action.search;
 
+import org.apache.logging.log4j.util.Supplier;
 import org.apache.lucene.util.IOUtils;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
 import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.routing.PlainShardIterator;
+import org.elasticsearch.cluster.routing.ShardIterator;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.component.AbstractComponent;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.CountDown;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.search.internal.AliasFilter;
 import org.elasticsearch.transport.Transport;
 import org.elasticsearch.transport.TransportException;
 import org.elasticsearch.transport.TransportService;
@@ -40,6 +48,7 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -48,9 +57,10 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
 import java.util.function.Predicate;
 
-
+//nocommit this class needs more javadocs and must be unittested
 public final class RemoteClusterService extends AbstractComponent implements Closeable {
 
     /**
@@ -81,6 +91,9 @@ public final class RemoteClusterService extends AbstractComponent implements Clo
      */
     public static final Setting<String> REMOTE_NODE_ATTRIBUTE = Setting.simpleString("search.remote.node_attribute",
         Setting.Property.NodeScope);
+
+    private static final char REMOTE_CLUSTER_INDEX_SEPARATOR = '|';
+
     private final TransportService transportService;
     private final int numRemoteConnections;
     private volatile Map<String, RemoteClusterConnection> remoteClusters = Collections.emptyMap();
@@ -91,6 +104,11 @@ public final class RemoteClusterService extends AbstractComponent implements Clo
         numRemoteConnections = REMOTE_CONNECTIONS_PER_CLUSTER.get(settings);
     }
 
+    /**
+     * This method updates the list of remote clusters. it's intendet to be used as a update consumer on the settings infrastructure
+     * @param seedSettings the group settings returned from {@link #REMOTE_CLUSTERS_SEEDS}
+     * @param connectionListener a listener invoked once every configured cluster has been connected to
+     */
     void updateRemoteClusters(Settings seedSettings, ActionListener<Void> connectionListener) {
         Map<String, RemoteClusterConnection> remoteClusters = new HashMap<>();
         Map<String, List<DiscoveryNode>> seeds = buildRemoteClustersSeeds(seedSettings);
@@ -139,6 +157,30 @@ public final class RemoteClusterService extends AbstractComponent implements Clo
         return remoteClusters.isEmpty() == false;
     }
 
+    public String[] filterIndices(Map<String, List<String>> perClusterIndices, String... requestIndices) {
+        List<String> localIndicesList = new ArrayList<>();
+        for (String index : requestIndices) {
+            int i = index.indexOf(REMOTE_CLUSTER_INDEX_SEPARATOR);
+            if (i >= 0) {
+                String remoteCluster = index.substring(0, i);
+                if (isRemoteClusterRegistered(remoteCluster)) {
+                    String remoteIndex = index.substring(i + 1);
+                    List<String> indices = perClusterIndices.get(remoteCluster);
+                    if (indices == null) {
+                        indices = new ArrayList<>();
+                        perClusterIndices.put(remoteCluster, indices);
+                    }
+                    indices.add(remoteIndex);
+                } else {
+                    localIndicesList.add(index);
+                }
+            } else {
+                localIndicesList.add(index);
+            }
+        }
+        return localIndicesList.toArray(new String[localIndicesList.size()]);
+}
+
     /**
      * Returns <code>true</code> iff the given cluster is configured as a remote cluster. Otherwise <code>false</code>
      */
@@ -146,8 +188,8 @@ public final class RemoteClusterService extends AbstractComponent implements Clo
         return remoteClusters.containsKey(clusterName);
     }
 
-    void sendSearchShards(SearchRequest searchRequest, Map<String, List<String>> remoteIndicesByCluster,
-                          ActionListener<Map<String, ClusterSearchShardsResponse>> listener) {
+    void collectSearchShards(SearchRequest searchRequest, Map<String, List<String>> remoteIndicesByCluster,
+                             ActionListener<Map<String, ClusterSearchShardsResponse>> listener) {
         final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
         final Map<String, ClusterSearchShardsResponse> searchShardsResponses = new ConcurrentHashMap<>();
         final AtomicReference<TransportException> transportException = new AtomicReference<>();
@@ -191,6 +233,47 @@ public final class RemoteClusterService extends AbstractComponent implements Clo
         }
     }
 
+
+    Function<String, Transport.Connection> processRemoteShards(Map<String, ClusterSearchShardsResponse> searchShardsResponses,
+                                                                       List<ShardIterator> remoteShardIterators,
+                                                                       Map<String, AliasFilter> aliasFilterMap) {
+        Map<String, Supplier<Transport.Connection>> nodeToCluster = new HashMap<>();
+        for (Map.Entry<String, ClusterSearchShardsResponse> entry : searchShardsResponses.entrySet()) {
+            String clusterName = entry.getKey();
+            ClusterSearchShardsResponse searchShardsResponse = entry.getValue();
+            for (DiscoveryNode remoteNode : searchShardsResponse.getNodes()) {
+                nodeToCluster.put(remoteNode.getId(), () -> getConnection(remoteNode, clusterName));
+            }
+            Map<String, AliasFilter> indicesAndFilters = searchShardsResponse.getIndicesAndFilters();
+            for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) {
+                //add the cluster name to the remote index names for indices disambiguation
+                //this ends up in the hits returned with the search response
+                ShardId shardId = clusterSearchShardsGroup.getShardId();
+                Index remoteIndex = shardId.getIndex();
+                Index index = new Index(clusterName + REMOTE_CLUSTER_INDEX_SEPARATOR + remoteIndex.getName(), remoteIndex.getUUID());
+                ShardIterator shardIterator = new PlainShardIterator(new ShardId(index, shardId.getId()),
+                    Arrays.asList(clusterSearchShardsGroup.getShards()));
+                remoteShardIterators.add(shardIterator);
+                AliasFilter aliasFilter;
+                if (indicesAndFilters == null) {
+                    aliasFilter = new AliasFilter(null, Strings.EMPTY_ARRAY);
+                } else {
+                    aliasFilter = indicesAndFilters.get(shardId.getIndexName());
+                    assert aliasFilter != null;
+                }
+                // here we have to map the filters to the UUID since from now on we use the uuid for the lookup
+                aliasFilterMap.put(remoteIndex.getUUID(), aliasFilter);
+            }
+        }
+        return (nodeId) -> {
+            Supplier<Transport.Connection> supplier = nodeToCluster.get(nodeId);
+            if (supplier == null) {
+                throw new IllegalArgumentException("unknown remote node: " + nodeId);
+            }
+            return supplier.get();
+        };
+    }
+
     /**
      * Returns a connection to the given node on the given remote cluster
      * @throws IllegalArgumentException if the remote cluster is unknown

+ 7 - 81
core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

@@ -19,10 +19,7 @@
 
 package org.elasticsearch.action.search;
 
-import org.apache.logging.log4j.util.Supplier;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
-import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.HandledTransportAction;
 import org.elasticsearch.cluster.ClusterState;
@@ -31,17 +28,13 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.routing.GroupShardsIterator;
-import org.elasticsearch.cluster.routing.PlainShardIterator;
 import org.elasticsearch.cluster.routing.ShardIterator;
 import org.elasticsearch.cluster.service.ClusterService;
-import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Setting.Property;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.Index;
-import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.search.SearchService;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.internal.AliasFilter;
@@ -51,10 +44,8 @@ import org.elasticsearch.transport.Transport;
 import org.elasticsearch.transport.TransportService;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -70,8 +61,6 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
     public static final Setting<Long> SHARD_COUNT_LIMIT_SETTING = Setting.longSetting(
             "action.search.shard_count.limit", 1000L, 1L, Property.Dynamic, Property.NodeScope);
 
-    private static final char REMOTE_CLUSTER_INDEX_SEPARATOR = '|';
-
     private final ClusterService clusterService;
     private final SearchTransportService searchTransportService;
     private final RemoteClusterService remoteClusterService;
@@ -124,7 +113,6 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
                 concreteIndexBoosts.putIfAbsent(concreteIndex.getUUID(), ib.getBoost());
             }
         }
-
         return Collections.unmodifiableMap(concreteIndexBoosts);
     }
 
@@ -132,32 +120,13 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
     protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
         // pure paranoia if time goes backwards we are at least positive
         final long startTimeInMillis = Math.max(0, System.currentTimeMillis());
-
         final String[] localIndices;
-        final Map<String, List<String>> remoteIndicesByCluster = new HashMap<>();
+        final Map<String, List<String>> remoteIndicesByCluster;
         if (remoteClusterService.isCrossClusterSearchEnabled()) {
-            List<String> localIndicesList = new ArrayList<>();
-            for (String index : searchRequest.indices()) {
-                int i = index.indexOf(REMOTE_CLUSTER_INDEX_SEPARATOR);
-                if (i >= 0) {
-                    String remoteCluster = index.substring(0, i);
-                    if (remoteClusterService.isRemoteClusterRegistered(remoteCluster)) {
-                        String remoteIndex = index.substring(i + 1);
-                        List<String> indices = remoteIndicesByCluster.get(remoteCluster);
-                        if (indices == null) {
-                            indices = new ArrayList<>();
-                            remoteIndicesByCluster.put(remoteCluster, indices);
-                        }
-                        indices.add(remoteIndex);
-                    } else {
-                        localIndicesList.add(index);
-                    }
-                } else {
-                    localIndicesList.add(index);
-                }
-            }
-            localIndices = localIndicesList.toArray(new String[localIndicesList.size()]);
+            remoteIndicesByCluster = new HashMap<>();
+            localIndices = remoteClusterService.filterIndices(remoteIndicesByCluster);
         } else {
+            remoteIndicesByCluster = Collections.emptyMap();
             localIndices = searchRequest.indices();
         }
 
@@ -165,61 +134,18 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
             executeSearch((SearchTask)task, startTimeInMillis, searchRequest, localIndices, Collections.emptyList(),
                 (nodeId) -> null, Collections.emptyMap(), listener);
         } else {
-            remoteClusterService.sendSearchShards(searchRequest, remoteIndicesByCluster,
+            remoteClusterService.collectSearchShards(searchRequest, remoteIndicesByCluster,
                 ActionListener.wrap((searchShardsResponses) -> {
                     List<ShardIterator> remoteShardIterators = new ArrayList<>();
                     Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();
-                    Function<String, Transport.Connection> connectionFunction = processRemoteShards(searchShardsResponses,
-                        remoteShardIterators, remoteAliasFilters);
+                    Function<String, Transport.Connection> connectionFunction = remoteClusterService.processRemoteShards(
+                        searchShardsResponses, remoteShardIterators, remoteAliasFilters);
                     executeSearch((SearchTask)task, startTimeInMillis, searchRequest, localIndices, remoteShardIterators,
                         connectionFunction, remoteAliasFilters, listener);
                 }, listener::onFailure));
         }
     }
 
-    private Function<String, Transport.Connection> processRemoteShards(Map<String, ClusterSearchShardsResponse> searchShardsResponses,
-                                     List<ShardIterator> remoteShardIterators,
-                                     Map<String, AliasFilter> aliasFilterMap) {
-        Map<String, Supplier<Transport.Connection>> nodeToCluster = new HashMap<>();
-        for (Map.Entry<String, ClusterSearchShardsResponse> entry : searchShardsResponses.entrySet()) {
-            String clusterName = entry.getKey();
-            ClusterSearchShardsResponse searchShardsResponse = entry.getValue();
-            for (DiscoveryNode remoteNode : searchShardsResponse.getNodes()) {
-                nodeToCluster.put(remoteNode.getId(), () -> remoteClusterService.getConnection(remoteNode, clusterName));
-            }
-            Map<String, AliasFilter> indicesAndFilters = searchShardsResponse.getIndicesAndFilters();
-            for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) {
-                //add the cluster name to the remote index names for indices disambiguation
-                //this ends up in the hits returned with the search response
-                ShardId shardId = clusterSearchShardsGroup.getShardId();
-                Index index = new Index(clusterName + REMOTE_CLUSTER_INDEX_SEPARATOR + shardId.getIndex().getName(),
-                        shardId.getIndex().getUUID());
-                ShardIterator shardIterator = new PlainShardIterator(new ShardId(index, shardId.getId()),
-                        Arrays.asList(clusterSearchShardsGroup.getShards()));
-                remoteShardIterators.add(shardIterator);
-                AliasFilter aliasFilter;
-                if (indicesAndFilters == null) {
-                    //TODO this section is returned only by 5.1+ nodes. With 5.0.x nodes we should rather retrieve the alias filters
-                    //using another api. What we do now causes the remote alias filters to be ignored whenever the node that we
-                    //called search shards against was on 5.0.x.
-                    aliasFilter = new AliasFilter(null, Strings.EMPTY_ARRAY);
-                } else {
-                    aliasFilter = indicesAndFilters.get(shardId.getIndexName());
-                    assert aliasFilter != null;
-                }
-                // here we have to map the filters to the UUID since from now on we use the uuid for the lookup
-                aliasFilterMap.put(shardId.getIndex().getUUID(), aliasFilter);
-            }
-        }
-        return (nodeId) -> {
-            Supplier<Transport.Connection> supplier = nodeToCluster.get(nodeId);
-            if (supplier == null) {
-                throw new IllegalArgumentException("unknown remote node: " + nodeId);
-            }
-            return supplier.get();
-        };
-    }
-
     private void executeSearch(SearchTask task, long startTimeInMillis, SearchRequest searchRequest, String[] localIndices,
                                List<ShardIterator> remoteShardIterators, Function<String, Transport.Connection> remoteConnections,
                                Map<String, AliasFilter> remoteAliasMap, ActionListener<SearchResponse> listener) {

+ 2 - 0
core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

@@ -256,6 +256,8 @@ public final class ClusterSettings extends AbstractScopedSettings {
                     TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
                     RemoteClusterService.REMOTE_CLUSTERS_SEEDS,
                     RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER,
+                    RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING,
+                RemoteClusterServic.REMOTE_NODE_ATTRIBUTE
                     TransportService.TRACE_LOG_EXCLUDE_SETTING,
                     TransportService.TRACE_LOG_INCLUDE_SETTING,
                     TransportCloseIndexAction.CLUSTER_INDICES_CLOSE_ENABLE_SETTING,

+ 6 - 3
core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java

@@ -137,10 +137,13 @@ public class SearchAsyncActionTests extends ESTestCase {
         latch.await();
         assertNotNull(response.get());
         assertFalse(nodeToContextMap.isEmpty());
-        assertTrue(nodeToContextMap.containsKey(primaryNode));
+        assertTrue(nodeToContextMap.toString(), nodeToContextMap.containsKey(primaryNode) || nodeToContextMap.containsKey(replicaNode));
         assertEquals(shardsIter.size(), numFreedContext.get());
-        assertTrue(nodeToContextMap.get(primaryNode).toString(), nodeToContextMap.get(primaryNode).isEmpty());
-
+        if (nodeToContextMap.containsKey(primaryNode)) {
+            assertTrue(nodeToContextMap.get(primaryNode).toString(), nodeToContextMap.get(primaryNode).isEmpty());
+        } else {
+            assertTrue(nodeToContextMap.get(replicaNode).toString(), nodeToContextMap.get(replicaNode).isEmpty());
+        }
     }
 
     private GroupShardsIterator getShardsIter(String index, int numShards, boolean doReplicas, DiscoveryNode primaryNode,

+ 0 - 24
core/src/test/java/org/elasticsearch/search/simple/SimpleSearchIT.java

@@ -20,38 +20,19 @@
 package org.elasticsearch.search.simple;
 
 import org.apache.lucene.util.Constants;
-import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.search.SearchPhaseExecutionException;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.cluster.ClusterName;
-import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.common.network.NetworkModule;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.xcontent.XContentFactory;
-import org.elasticsearch.discovery.zen.PublishClusterStateActionTests;
-import org.elasticsearch.env.Environment;
-import org.elasticsearch.env.NodeEnvironment;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.node.MockNode;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeValidationException;
-import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.rest.RestStatus;
-import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.search.rescore.QueryRescorerBuilder;
 import org.elasticsearch.test.ESIntegTestCase;
-import org.elasticsearch.test.InternalTestCluster;
-import org.elasticsearch.test.discovery.TestZenDiscovery;
-import org.elasticsearch.transport.MockTcpTransportPlugin;
 
-import java.nio.file.Path;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 
@@ -72,11 +53,6 @@ import static org.hamcrest.Matchers.containsString;
 
 public class SimpleSearchIT extends ESIntegTestCase {
 
-    @Override
-    protected boolean useSearchProxyNode() {
-        return true;
-    }
-
     public void testSearchNullIndex() {
         expectThrows(NullPointerException.class,
                 () -> client().prepareSearch((String) null).setQuery(QueryBuilders.termQuery("_id", "XXX1")).get());

+ 0 - 111
test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

@@ -30,7 +30,6 @@ import org.apache.lucene.util.LuceneTestCase;
 import org.apache.lucene.util.TestUtil;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ExceptionsHelper;
-import org.elasticsearch.action.ActionFuture;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.DocWriteResponse;
 import org.elasticsearch.action.ShardOperationFailedException;
@@ -55,24 +54,19 @@ import org.elasticsearch.action.get.GetResponse;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.search.ClearScrollResponse;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.client.AdminClient;
 import org.elasticsearch.client.Client;
-import org.elasticsearch.client.FilterClient;
 import org.elasticsearch.client.Requests;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestClientBuilder;
 import org.elasticsearch.cluster.ClusterModule;
-import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.health.ClusterHealthStatus;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.cluster.metadata.MetaData;
-import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.IndexRoutingTable;
 import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
 import org.elasticsearch.cluster.routing.ShardRouting;
@@ -83,10 +77,8 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.collect.Tuple;
-import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.network.NetworkAddress;
 import org.elasticsearch.common.network.NetworkModule;
@@ -98,7 +90,6 @@ import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.common.xcontent.ToXContent;
@@ -126,10 +117,7 @@ import org.elasticsearch.indices.IndicesQueryCache;
 import org.elasticsearch.indices.IndicesRequestCache;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.indices.store.IndicesStore;
-import org.elasticsearch.node.MockNode;
-import org.elasticsearch.node.Node;
 import org.elasticsearch.node.NodeMocksPlugin;
-import org.elasticsearch.node.NodeValidationException;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.script.ScriptService;
@@ -150,7 +138,6 @@ import org.junit.BeforeClass;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.UncheckedIOException;
 import java.lang.annotation.Annotation;
 import java.lang.annotation.ElementType;
 import java.lang.annotation.Inherited;
@@ -173,7 +160,6 @@ import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Callable;
@@ -350,7 +336,6 @@ public abstract class ESIntegTestCase extends ESTestCase {
 
     private static ESIntegTestCase INSTANCE = null; // see @SuiteScope
     private static Long SUITE_SEED = null;
-    private static Node searchProxyNode;
 
     @BeforeClass
     public static void beforeClass() throws Exception {
@@ -381,9 +366,6 @@ public abstract class ESIntegTestCase extends ESTestCase {
         cluster().beforeTest(random(), getPerTestTransportClientRatio());
         cluster().wipe(excludeTemplates());
         randomIndexTemplate();
-        if (useSearchProxyNode()) {
-            searchProxyNode = startSearchProxyNode();
-        }
     }
 
     private void printTestMessage(String message) {
@@ -556,7 +538,6 @@ public abstract class ESIntegTestCase extends ESTestCase {
     protected final void afterInternal(boolean afterClass) throws Exception {
         boolean success = false;
         try {
-
             final Scope currentClusterScope = getCurrentClusterScope();
             clearDisruptionScheme();
             try {
@@ -600,8 +581,6 @@ public abstract class ESIntegTestCase extends ESTestCase {
                 if (currentClusterScope == Scope.TEST) {
                     clearClusters(); // it is ok to leave persistent / transient cluster state behind if scope is TEST
                 }
-                IOUtils.close(searchProxyNode);
-                searchProxyNode = null;
             }
             success = true;
         } finally {
@@ -655,53 +634,6 @@ public abstract class ESIntegTestCase extends ESTestCase {
         if (frequently()) {
             client = new RandomizingClient(client, random());
         }
-        if (searchProxyNode != null && randomBoolean()) {
-            client = new FilterClient(client) {
-                @Override
-                public SearchRequestBuilder prepareSearch(String... indices) {
-                    return searchProxyNode.client().prepareSearch(convertToRemoteIndices(indices));
-                }
-
-                private String[] convertToRemoteIndices(String[] indices) {
-                    if (Objects.requireNonNull(indices).length == 0) {
-                        return new String[] {"test_remote_cluster|_all"};
-                    }
-                    String [] remoteIndices = new String[indices.length];
-                    for (int i = 0; i < indices.length; i++) {
-                        remoteIndices[i] = "test_remote_cluster|"+ Objects.requireNonNull(indices[i]);
-                    }
-                    return remoteIndices;
-                }
-
-                @Override
-                public ActionFuture<SearchResponse> search(SearchRequest request) {
-                    // we copy the request to ensure we never modify the original request
-                    try (BytesStreamOutput out = new BytesStreamOutput()) {
-                        request.writeTo(out);
-                        SearchRequest copy = new SearchRequest();
-                        copy.readFrom(out.bytes().streamInput());
-                        copy.indices(convertToRemoteIndices(request.indices()));
-                        return searchProxyNode.client().search(copy);
-                    } catch (IOException ex) {
-                        throw new UncheckedIOException(ex);
-                    }
-                }
-
-                @Override
-                public void search(SearchRequest request, ActionListener<SearchResponse> listener) {
-                    // we copy the request to ensure we never modify the original request
-                    try (BytesStreamOutput out = new BytesStreamOutput()) {
-                        request.writeTo(out);
-                        SearchRequest copy = new SearchRequest();
-                        copy.readFrom(out.bytes().streamInput());
-                        copy.indices(convertToRemoteIndices(request.indices()));
-                        searchProxyNode.client().search(copy, listener);
-                    } catch (IOException ex) {
-                        throw new UncheckedIOException(ex);
-                    }
-                }
-            };
-        }
         return client;
     }
 
@@ -2271,47 +2203,4 @@ public abstract class ESIntegTestCase extends ESTestCase {
         String uuid = getIndexResponse.getSettings().get(index).get(IndexMetaData.SETTING_INDEX_UUID);
         return new Index(index, uuid);
     }
-
-    protected boolean useSearchProxyNode() {
-        return false; // nocommit - lets enable this globally
-    }
-
-    private synchronized Node startSearchProxyNode() {
-        if (isInternalCluster()) {
-            final DiscoveryNode seedNode = internalCluster().getInstance(ClusterService.class).localNode();
-            final Path tempDir = createTempDir();
-            Settings settings = Settings.builder()
-                .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), "search_proxy_" + internalCluster().getClusterName())
-                .put(Environment.PATH_HOME_SETTING.getKey(), tempDir)
-                .put(Environment.PATH_REPO_SETTING.getKey(), tempDir.resolve("repo"))
-                .put(Environment.PATH_SHARED_DATA_SETTING.getKey(), createTempDir().getParent())
-                .put("search.remote.seeds.test_remote_cluster", seedNode.getAddress().toString())
-                .put("node.name", "node_prx_0")
-                .put(EsExecutors.PROCESSORS_SETTING.getKey(), 1) // limit the number of threads created
-                .put(NetworkModule.HTTP_ENABLED.getKey(), false)
-                .put("transport.type", MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME)
-                .put(Node.NODE_DATA_SETTING.getKey(), false)
-                .put(Node.NODE_MASTER_SETTING.getKey(), true)
-                .put(Node.NODE_INGEST_SETTING.getKey(), false)
-                .build();
-            Collection<Class<? extends Plugin>> plugins = nodePlugins();
-            if (plugins.contains(MockTcpTransportPlugin.class) == false) {
-                plugins = new ArrayList<>(plugins);
-                plugins.add(MockTcpTransportPlugin.class);
-            }
-            if (plugins.contains(TestZenDiscovery.TestPlugin.class) == false) {
-                plugins = new ArrayList<>(plugins);
-                plugins.add(TestZenDiscovery.TestPlugin.class);
-            }
-            Node build = new MockNode(settings, plugins);
-            try {
-                build.start();
-            } catch (NodeValidationException e) {
-                throw new RuntimeException(e);
-            }
-            return build;
-        } else {
-            return null;
-        }
-    }
 }